package com.shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo6MRReadAndWriteHBase {
    // 读取HBase students表，统计班级人数，并将最终结果写回HBase表'mr_res'
    public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // key ： rowkey  value ：一条数据
            String rowkey = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));

        }
    }

    public static class WriteHBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
            int cnt = 0;
            for (IntWritable value : values) {
                cnt += value.get();
            }
            // 以班级作为rowkey
            Put put = new Put(key.getBytes());
            put.addColumn("cf1".getBytes(), "num".getBytes(), Bytes.toBytes(cnt));

            context.write(NullWritable.get(), put);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,node1,node2");

        Job job = Job.getInstance(conf);
        job.setJobName("Demo6MRReadAndWriteHBase");
        job.setJarByClass(Demo6MRReadAndWriteHBase.class);


        // 配置Map任务
        TableMapReduceUtil.initTableMapperJob(
                "students",
                new Scan(),
                ReadHBaseMapper.class,
                Text.class,
                IntWritable.class,
                job
        );

        // 配置Reduce任务
        TableMapReduceUtil.initTableReducerJob(
                "mr_res",
                WriteHBaseReducer.class,
                job
        );


        job.waitForCompletion(true);
    }
    /**
     * create 'mr_res','cf1'
     * hadoop jar HBaseJavaAPI12-1.0-jar-with-dependencies.jar com.shujia.Demo6MRReadAndWriteHBase
     */
}
